嗨嗨,相信大家在昨天的文章中對 AST 和 logical plan 已經有了基礎的認識,今天就要來看看 DataFusion 是如何處理 SQL 語法的啦~ 讓我們開始吧!
DataFusion 採用三層架構設計,將查詢處理分為使用者介面、查詢規劃和實際執行三個層次,每層都有對應的 Context 負責管理狀態和資源:
SessionContext : 使用者介面層,提供高層次的 API,如 sql(), read_csv(), read_parquet() 等,同一個 SessionContext 下的查詢可以共享相同的配置和資源。
#[derive(Clone)]
pub struct SessionContext {
    /// UUID for the session
    session_id: String,
    /// Session start time
    session_start_time: DateTime<Utc>,
    /// Shared session state for the session
    state: Arc<RwLock<SessionState>>,
}
SessionState : 查詢計劃層, 包含查詢規劃所需的完整狀態,負責將 SQL 轉換為邏輯計劃和物理計劃,每個查詢都使用獨立的**SessionState**確保隔離性。
#[derive(Clone)]
pub struct SessionState {
		// ...
    /// Responsible for optimizing a logical plan
    optimizer: Optimizer,
    /// Responsible for optimizing a physical execution plan
    physical_optimizers: PhysicalOptimizer,
    /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
    query_planner: Arc<dyn QueryPlanner + Send + Sync>,
    /// Scalar functions that are registered with the context
    scalar_functions: HashMap<String, Arc<ScalarUDF>>,
    /// Session configuration
    config: SessionConfig,
    /// Runtime environment
    runtime_env: Arc<RuntimeEnv>,
    // ... 
}
TaskContext: 是執行層的輕量級上下文,只包含執行執行計畫 (ExecutionPlan) 所需的最小狀態,支援並行執行和記憶體管理。
#[derive(Debug)]
pub struct TaskContext {
    /// Session Id
    session_id: String,
    /// Optional Task Identify
    task_id: Option<String>,
    /// Session configuration
    session_config: SessionConfig,
    /// Scalar functions associated with this task context
    scalar_functions: HashMap<String, Arc<ScalarUDF>>,
    /// Aggregate functions registered in the context
    aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
    /// Window functions registered in the context
    window_functions: HashMap<String, Arc<WindowUDF>>,
    /// Runtime environment associated with this task context
    runtime: Arc<RuntimeEnv>,
}
而今日重點— SQL → AST → logical plan 的流程大部分都會集中在 SessionState 實作的各種方法中呦!
DataFusion 將 SQL 轉換為 logical plan 的過程分為三個階段:
首先創建  DFParserBuilder ,並調用一系列的方法初始化 DFParser 實例 (build),最後透過 DFParser 將 SQL 字串,轉換成 AST (Statement 物件)。
// datafusion/core/src/execution/session_states.rs
let mut statements = DFParserBuilder::new(sql)
        .with_dialect(dialect.as_ref())
        .with_recursion_limit(recursion_limit)
        .build()?
        .parse_statements()?;
觀察 build可以發現目標的 SQL 字串會先經過 Tokenizer 將關鍵字、識別符或運算符等字眼拆分成獨立的 token 放入 vector 並去除不需要的標點符號或空白符等。
// datafusion/sql/src/parser.rs
pub fn build(self) -> Result<DFParser<'a>, DataFusionError> {
    let mut tokenizer = Tokenizer::new(self.dialect, self.sql);
    // Convert TokenizerError -> ParserError
    let tokens = tokenizer
        .tokenize_with_location()
        .map_err(ParserError::from)?;
    Ok(DFParser {
        parser: Parser::new(self.dialect)
            .with_tokens_with_locations(tokens)
            .with_recursion_limit(self.recursion_limit),
        options: SqlParserOptions {
            recursion_limit: self.recursion_limit,
            ..Default::default()
        },
    })
}
隨後 parse_statements() 就會開始將 tokens 轉換成 AST 啦,一開始會用 peek_nth_token 偷看一下第一個 token 是甚麼,如果是關鍵字的話就會根據不同關鍵字有不同的處理方式:
pub fn parse_statement(&mut self) -> Result<Statement, DataFusionError> {
    match self.parser.peek_token().token {
        Token::Word(w) => {
            match w.keyword {
                Keyword::CREATE => {
                    self.parser.next_token(); // CREATE
                    self.parse_create()
                }
                Keyword::COPY => {
                    if let Token::Word(w) = self.parser.peek_nth_token(1).token {
                        // use native parser for COPY INTO
                        if w.keyword == Keyword::INTO {
                            return self.parse_and_handle_statement();
                        }
                    }
                    self.parser.next_token(); // COPY
                    self.parse_copy()
                }
                Keyword::EXPLAIN => {
                    self.parser.next_token(); // EXPLAIN
                    self.parse_explain()
                }
                _ => {
                    // use sqlparser-rs parser
                    self.parse_and_handle_statement()
                }
            }
        }
        _ => {
            // use the native parser
            self.parse_and_handle_statement()
        }
    }
}
parse_statement 完成後返回的 statements 便是解析完成的 AST,往外一層會發現我們剛剛觀察的範圍都在 create_logical_plan 內的 sql_to_statement,接著我們就繼續往下看 statement_to_plan
// datafusion/core/src/execution/session_states.rs
#[cfg(feature = "sql")]
    pub async fn create_logical_plan(
        &self,
        sql: &str,
    ) -> datafusion_common::Result<LogicalPlan> {
        let dialect = self.config.options().sql_parser.dialect.as_str();
        let statement = self.sql_to_statement(sql, dialect)?;
        let plan = self.statement_to_plan(statement).await?;
        Ok(plan)
    }
我們可以將 statement_to_plan 拆解成三個步驟:
Step1: 從 AST 中提取所有表格引用,為後續查找做準備**。**
// datafusion/core/src/execution/session_states.rs 
let references = self.resolve_table_references(&statement)?;
舉例來說假設查詢如下:
SELECT u.name, o.total 
FROM users u 
JOIN orders o ON u.id = o.user_id
就會提取出:["users", "orders"]
Step2: 建立 SessionContextProvider,並逐一解析和檢查表格中 Schema 和表格欄位的定義,此時如果發生表格或欄位不存在就會提早拋出錯誤。
 let mut provider = SessionContextProvider {
            state: self,
            tables: HashMap::with_capacity(references.len()),
        };
for reference in references {
    let resolved = self.resolve_table_ref(reference);
    if let Entry::Vacant(v) = provider.tables.entry(resolved) {
        let resolved = v.key();
        if let Ok(schema) = self.schema_for_ref(resolved.clone()) {
            if let Some(table) = schema.table(&resolved.table).await? {
                v.insert(provider_as_source(table));
            }
        }
    }
}
Step3: 基於 SessionContextProvider 創建 SqlToRel,SqlToRel 是 DataFusion 的語義分析器,可以檢查 SQL 語法或語意是否符合規則,最後就會將 AST 轉換成 logical plan
let query = SqlToRel::new_with_options(&provider, self.get_parser_options());
        query.statement_to_plan(statement)
DataFusion 提供了完整的範例程式碼,位於 datafusion-examples/examples/ :
sql_frontend.rs: 如何將 SQL 轉換為 logical planplan_to_sql.rs: 如何將 logical plan 反向轉換為 SQLsql_analysis.rs: 如何分析 SQL 查詢結構大家不妨實際執行這些範例:
git clone https://github.com/apache/datafusion
cd datafusion/datafusion-examples
cargo run --example sql_frontend
今天我們揭開了 DataFusion 查詢執行的神秘面紗,通過閱讀實際原始碼理解了從 SQL 字串到 logical plan 的轉換機制。明天我們將探討 logical plan 優化,我們會看到 DataFusion 的優化器如何將初始計畫改寫為更高效的版本,我們拭目以待!